Conversation
|
Review updated until commit 7283aa8 Description
|
| Relevant files | |||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
| Enhancement | 5 files
| ||||||||||
| Configuration changes | |||||||||||
| Documentation | 1 files
| ||||||||||
| Tests | 1 files
|
PR Reviewer Guide
Here are some key observations to aid the review process:
| 🧪 PR contains tests |
| ⚡ Recommended focus areas for review |
Spin loop performance concern
This can cause high CPU usage and may not be optimal for latency-sensitive workloads. Consider if this should yield to other threads or use a more efficient waiting mechanism. |
samnordmann
left a comment
There was a problem hiding this comment.
Thank you very much! This looks great.
Here are some comments requesting some minor changes or explanation. The only point I'm a bit worried about is that we need a way to make the "wait" not blocking for cpu.
| } | ||
|
|
||
| void NixlBackend::cleanup() { | ||
| cleaned_up_ = true; |
There was a problem hiding this comment.
naybe reuse available_ here ?
There was a problem hiding this comment.
We can, I added this attribute for tracability, as if someone calls getInstance after cleaning the backend, it's for sure a lifecycle bug and I thought it deserves its own error message to better trace it, instead of just getting "Backend not available" without knowing that it comes from a code mistake.
Fuser/csrc/multidevice/nixl.cpp
Lines 465 to 469 in f8a94fc
But your call, let me know what you prefer.
There was a problem hiding this comment.
You could still use available_ here to print the same message. I personally think it is harder to reason about several dependent internal states. The question is: "in what situation exactly is it useful to have both available_ and cleaned_up_ state?". I fail to see such a case.
But it's really your call at the end!
Btw, cf. other comment, the lifecycle should imo be managed by Communicator.
There was a problem hiding this comment.
btw, we could even argue that available_ is redundant with impl_ == nullptr
There was a problem hiding this comment.
There is a case where impl is not null but just points to a trivial impl
But the right solution here would be just not to instanciate impl if it cannot be instanciated
| NVF_THROW("Failed to create UCX backend for NIXL agent"); | ||
| } | ||
|
|
||
| // Probe: verify that VRAM (CUDA GPU memory) is actually usable with |
There was a problem hiding this comment.
is that really necessary? It looks suspicious to me, can you help me understand?
There was a problem hiding this comment.
It's because UCX build can lack CUDA support, and registerMem silently succeeds but misclassifies VRAM as host memory behind the scenes, and the only way to check this is to actually call prepXferDlist on an example tensor (which fails in the case ucx doesn't have cuda).
Without this probe, NIXL would be marked as available and registerMem would work, but then fail much later on prepareTransfer or postTransfer. As of my experience, it can be pretty confusing.
There was a problem hiding this comment.
Ok I see, so let's keep this check for now. Is UCX/NIXL team aware of this bug and plan to fix?
There was a problem hiding this comment.
I opened a PR in nixl: ai-dynamo/nixl#1393
| #endif | ||
| } | ||
|
|
||
| void NixlBackend::Impl::waitTransfer(NixlTransferHandle& handle) { |
There was a problem hiding this comment.
This wait function is cpu blocking so in practice it is more or less unusable in our context. Do you have an idea how to make this not blocking for cpu -- and ideally cuda-graph capturable?
|
@x41lakazam Can you provide instructions on how to build nixl, say, from pjnl docker image? We'll probably need to think how to add the library is the base image and/or the CI, unless it is already shipped in some DLFW package |
https://nvidia.slack.com/archives/C08KL9MNQ3U/p1771951941351029 |
|
Note the build error in the CI |
Greptile SummaryThis PR adds a NIXL (NVIDIA Interconnect Exchange Library) backend to nvfuser's multidevice communication layer, providing a UCX-based one-sided RDMA API for GPU tensor transfers. The backend follows the existing UCC/NCCL pattern with a Key issues found:
Confidence Score: 1/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant App
participant NixlBackend
participant NixlBackend_Impl
participant nixlAgent
participant TCPStore
participant Communicator
App->>NixlBackend: getInstance()
NixlBackend->>NixlBackend_Impl: Impl::create(communicator)
NixlBackend_Impl->>nixlAgent: new nixlAgent(name, cfg)
NixlBackend_Impl->>nixlAgent: createBackend("UCX", params)
NixlBackend_Impl->>nixlAgent: probe VRAM registration
NixlBackend_Impl-->>NixlBackend: impl_ (or nullptr if unavailable)
App->>NixlBackend: registerTensors({t1, t2})
NixlBackend->>NixlBackend_Impl: registerTensors
NixlBackend_Impl->>nixlAgent: registerMem(dlist)
App->>NixlBackend: exchangeMetadata()
NixlBackend->>NixlBackend_Impl: exchangeMetadata
NixlBackend_Impl->>nixlAgent: getLocalMD(local_md)
NixlBackend_Impl->>TCPStore: set("nixl_agent_md_rank_N", local_md)
loop for each peer rank
NixlBackend_Impl->>TCPStore: get("nixl_agent_md_rank_peer")
NixlBackend_Impl->>nixlAgent: loadRemoteMD(remote_md)
end
NixlBackend_Impl->>Communicator: barrier()
NixlBackend_Impl->>TCPStore: deleteKey("nixl_agent_md_rank_N")
App->>NixlBackend: prepareTransfer(local_descs, remote_descs, peer, op)
NixlBackend->>NixlBackend_Impl: prepareTransfer
NixlBackend_Impl->>nixlAgent: createXferReq(op, local_dlist, remote_dlist)
NixlBackend_Impl-->>App: NixlTransferHandle
App->>NixlBackend: postTransfer(handle)
NixlBackend->>NixlBackend_Impl: postTransfer
NixlBackend_Impl->>nixlAgent: postXferReq(xfer_handle)
App->>NixlBackend: waitTransfer(handle)
NixlBackend->>NixlBackend_Impl: waitTransfer (spin-poll)
NixlBackend_Impl->>nixlAgent: getXferStatus(xfer_handle)
nixlAgent-->>NixlBackend_Impl: NIXL_SUCCESS
NixlBackend_Impl-->>App: done
Last reviewed commit: 7283aa8 |
| // TODO - check this spin loop | ||
| NixlXferStatus xfer_status; | ||
| do { | ||
| xfer_status = getTransferStatus(handle); | ||
| NVF_ERROR( | ||
| xfer_status != NixlXferStatus::kError, | ||
| "NIXL transfer completed with an error"); | ||
| } while (xfer_status == NixlXferStatus::kInProgress); |
There was a problem hiding this comment.
busy-wait spin loop will consume CPU while waiting for transfer completion
consider adding std::this_thread::yield() or a small sleep inside the loop to reduce CPU usage
| // TODO - check this spin loop | |
| NixlXferStatus xfer_status; | |
| do { | |
| xfer_status = getTransferStatus(handle); | |
| NVF_ERROR( | |
| xfer_status != NixlXferStatus::kError, | |
| "NIXL transfer completed with an error"); | |
| } while (xfer_status == NixlXferStatus::kInProgress); | |
| NixlXferStatus xfer_status; | |
| do { | |
| xfer_status = getTransferStatus(handle); | |
| NVF_ERROR( | |
| xfer_status != NixlXferStatus::kError, | |
| "NIXL transfer completed with an error"); | |
| if (xfer_status == NixlXferStatus::kInProgress) { | |
| std::this_thread::yield(); | |
| } | |
| } while (xfer_status == NixlXferStatus::kInProgress); |
| void NixlBackend::registerTensors(const std::vector<at::Tensor>& tensors) { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| impl_->registerTensors(tensors); | ||
| } | ||
|
|
||
| void NixlBackend::deregisterTensors(const std::vector<at::Tensor>& tensors) { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| impl_->deregisterTensors(tensors); | ||
| } | ||
|
|
||
| void NixlBackend::exchangeMetadata() { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| impl_->exchangeMetadata(); | ||
| } | ||
|
|
||
| NixlTransferHandle NixlBackend::prepareTransfer( | ||
| const std::vector<TensorDesc>& local_descs, | ||
| const std::vector<TensorDesc>& remote_descs, | ||
| int64_t remote_rank, | ||
| NixlXferOp op) { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| return impl_->prepareTransfer( | ||
| local_descs, remote_descs, remote_rank, op); | ||
| } | ||
|
|
||
| void NixlBackend::postTransfer(NixlTransferHandle& handle) { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| impl_->postTransfer(handle); | ||
| } | ||
|
|
||
| NixlXferStatus NixlBackend::getTransferStatus( | ||
| const NixlTransferHandle& handle) const { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| return impl_->getTransferStatus(handle); | ||
| } | ||
|
|
||
| void NixlBackend::waitTransfer(NixlTransferHandle& handle) { | ||
| NVF_CHECK(isAvailable(), "NIXL backend is not available"); | ||
| impl_->waitTransfer(handle); | ||
| } |
There was a problem hiding this comment.
Compile error when USE_NIXL is not defined
All seven NixlBackend public API methods (lines 432–471) unconditionally call through to impl_->registerTensors(...), impl_->deregisterTensors(...), impl_->exchangeMetadata(), impl_->prepareTransfer(...), impl_->postTransfer(...), impl_->getTransferStatus(...), and impl_->waitTransfer(...).
When USE_NIXL is not defined, NixlBackend::Impl is the empty stub class NixlBackend::Impl {}; (line 403). None of those methods exist on the empty class, so every delegating call is a hard compile error:
error: 'class nvfuser::NixlBackend::Impl' has no member named 'registerTensors'
Because nixl.cpp is added to NVFUSER_SRCS unconditionally (CMakeLists.txt line 251), any build with NVFUSER_STANDALONE_BUILD_WITH_NIXL=OFF (the default) will fail to compile.
The fix is to either guard these wrapper methods with #ifdef USE_NIXL, or add stub declarations to the #else branch of the Impl definition so that the calls are still valid (and simply unreachable at runtime due to the NVF_CHECK(isAvailable(), ...) guard):
#else // !USE_NIXL
class NixlBackend::Impl {
public:
void registerTensors(const std::vector<at::Tensor>&) {}
void deregisterTensors(const std::vector<at::Tensor>&) {}
void exchangeMetadata() {}
NixlTransferHandle prepareTransfer(
const std::vector<TensorDesc>&,
const std::vector<TensorDesc>&,
int64_t,
NixlXferOp) {
return {};
}
void postTransfer(NixlTransferHandle&) {}
NixlXferStatus getTransferStatus(const NixlTransferHandle&) const {
return NixlXferStatus::kError;
}
void waitTransfer(NixlTransferHandle&) {}
};
#endif // USE_NIXL| NVF_ERROR( | ||
| status == NIXL_SUCCESS, | ||
| "NIXL loadRemoteMD failed for rank ", | ||
| rank, | ||
| " with status ", | ||
| static_cast<int>(status)); | ||
| } | ||
|
|
||
| // Barrier before deleting keys so no rank reads a deleted key. | ||
| communicator_.barrier(); |
There was a problem hiding this comment.
Deadlock if loadRemoteMD fails on any rank
If agent_->loadRemoteMD(...) returns a non-success status, NVF_ERROR at line 284 throws an exception on the failing rank. That rank never reaches the communicator_.barrier() at line 293. All other ranks, however, proceed to call barrier() and block indefinitely, producing a distributed deadlock.
The barrier here is intended to prevent premature key deletion, but the error path bypasses it. Consider either:
- Catching the error, reaching the barrier (to unblock peers), and re-throwing afterward, or
- Using a try/catch or a status variable to ensure
barrier()is always called before propagating errors:
nixl_status_t failed_status = NIXL_SUCCESS;
int64_t failed_rank = -1;
for (int64_t rank = 0; rank < world_size; ++rank) {
if (rank == my_rank) continue;
auto bytes = store->get(md_key_prefix + std::to_string(rank));
nixl_blob_t remote_md(bytes.begin(), bytes.end());
std::string remote_agent_name;
nixl_status_t status = agent_->loadRemoteMD(remote_md, remote_agent_name);
if (status != NIXL_SUCCESS && failed_status == NIXL_SUCCESS) {
failed_status = status;
failed_rank = rank;
}
}
// Always reach the barrier so other ranks are not left hanging.
communicator_.barrier();
store->deleteKey(md_key_prefix + std::to_string(my_rank));
NVF_ERROR(
failed_status == NIXL_SUCCESS,
"NIXL loadRemoteMD failed for rank ", failed_rank,
" with status ", static_cast<int>(failed_status));
metadata_exchanged_ = true;| c10d::TCPStore* getTcpStore() { | ||
| return store_.get(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Missing indentation on closing brace
The closing brace of getTcpStore() at line 129 has lost its 2-space class-body indentation, which is inconsistent with every other method in this class.
| c10d::TCPStore* getTcpStore() { | |
| return store_.get(); | |
| } | |
| } | |
| c10d::TCPStore* getTcpStore() { | |
| return store_.get(); | |
| } |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
No description provided.